package org.luosl.webmagicx.pipeline

import java.sql.{Connection, ResultSet}
import java.util

import com.zaxxer.hikari.HikariConfig
import org.luosl.webmagicx.utils.DbTools
import org.luosl.webmagicx.conf.PropType._
import org.luosl.webmagicx.conf.{ConfException, XmlProps, SpiderConf}
import org.luosl.webmagicx.pipeline.component.{Distinct, HashSetDistinct}
import org.luosl.webmagicx.conf.MatcherConverters._
import us.codecraft.webmagic.{ResultItems, Task}
import com.zaxxer.hikari.HikariDataSource

import scala.collection.JavaConverters._

/**
  * Created by luosl on 2017/12/11.
  */
class SimpleJdbcPipeline(sc:SpiderConf, task:Task, props:XmlProps) extends AbstractPipeline(sc, task, props) {

  /**
    * 数据库表名称
    */
  private val tableName:String = props.value("tableName", "value")(strType)

  /**
    * 数据源
    */
  private val dataSource:HikariDataSource = {
    val hikariConfig = new HikariConfig()
    hikariConfig.setJdbcUrl( props.value("url", "value")(strType))
    hikariConfig.setDriverClassName(props.value("driver", "value")(strType))
    hikariConfig.setUsername(props.value("user", "value")(strType))
    hikariConfig.setPassword(props.value("password", "value")(strType))
    new HikariDataSource(hikariConfig)
  }

  /**
    * insertSql sql 和字段信息
    */
  private val (insertSql, dbColumns, fields):(String, List[String], List[String]) = {
    // 解析 field 和 dbColumn
    val (dbColumns, fields):(List[String], List[String]) = {
      val needSaveFields:String = props.valueOrDefault("needSaveFields", "value")(strType)("*")
      val fieldsAndDbColumns:(List[String], List[String]) = if(needSaveFields == "*"){
        (allFields, allFields)
      }else{
        needSaveFields.split(",|，").foldLeft((List.empty[String],List.empty[String])){ (tu, item)=>
          val fieldAndDbColumn:(String, String) = item.split(":") match {
            case Array(fieldName) => (fieldName, fieldName)
            case Array(field,dbColumnName) => (field,dbColumnName)
            case _ => throw ConfException(s"无效的needSaveFields表达式")
          }
          (tu._1 ::: List(fieldAndDbColumn._1), tu._2 ::: List(fieldAndDbColumn._2))
        }
      }
      (fieldsAndDbColumns._2, fieldsAndDbColumns._1)
    }
    // 构建 insertSql 语句
    val dbFields:String = dbColumns.mkString(",")
    val dbParms:String = dbColumns.indices.map(_ => "?").mkString(",")
    val sql:String = s"insert into $tableName($dbFields) values($dbParms)"
    (sql,dbColumns, fields)
  }

  checkOrCreateTable()

  /**
    * 初始化 distinct
    */
  private val distinctOptVal:Option[Distinct] = {
    val distinctParams:(Option[String], Option[String]) = (
      props.valueOption("distinct", "field")(strType),
      props.valueOption("distinct", "dbColumn")(strType)
    )
    distinctParams match {
      case (None, None) => None
      case (Some(filed),Some(dbColumn)) =>
        val loadCacheOp:util.HashSet[Any] => Unit = (set:util.HashSet[Any])=> {
          val sql = s"select $dbColumn from $tableName"
          val sqlOp:ResultSet=>Unit = (rs:ResultSet) => while(rs.next()) set.add(rs.getObject(1))
          DbTools.executeQuery(sql,conn(),sqlOp,autoClose = false)
        }
        val distValOp:ResultItems => Any = (ris:ResultItems) => ris.get(filed.toString).asInstanceOf[Any]
        Option(new HashSetDistinct(loadCacheOp,distValOp))
      case _ => throw ConfException("jdbcPipeline 的 distinct 标签必须同时配置 field 属性和 dbColumn 属性")
    }
  }


  /**
    * 获取 去重器
    *
    * @return
    */
  override def distinctOpt(): Option[Distinct] = distinctOptVal

  /**
    * 得到数据库链接
    *
    * @return
    */
  def conn():Connection = dataSource.getConnection

  /**
    * 创建或者truncate表
    */
  private def checkOrCreateTable(): Unit ={
    /**
      * 自动建表语句生成
      * @return
      */
    def genCreateTableSql():String = {
      val dbType:String = props.value("driver", "value")(strType) match {
        case "com.mysql.jdbc.Driver" => "longtext"
        case "oracle.jdbc.driver.OracleDriver" => "clob"
        case str:String => throw new RuntimeException(s"暂不支持创建表的数据库类型:[driver=$str],请手动创建映射表")
      }
      s"""
         |CREATE TABLE $tableName (
         |${dbColumns.map(cln=> s"$cln $dbType").mkString(",")}
         |)
        """.stripMargin
    }
    val tableExist:Boolean = DbTools.tableExist(tableName,conn(),autoClose = true)
    // 数据库表创建
    val autoCreateTable:Boolean = props.valueOrDefault("autoCreateTable", "value")(booleanType)(true)
    if(!tableExist && autoCreateTable){
      // 如果表不存在则创建
      val createTableSql:String = genCreateTableSql()
      logInfo(s"开始创建数据库表:$tableName ,[sql=$createTableSql]")
      try {
        DbTools.executeUpdate(createTableSql, conn(), autoClose = true)
      }catch {
        case e:Exception => logError(s"执行建表语句:$createTableSql 失败!",e)
      }
    }
    // mode 识别
    val mode:String = props.valueOrDefault("model", "value")(strType)("append")
    mode match {
      case "append" => logInfo("当前Model为:append")
      case "override" =>
        logInfo("当前Model为:override，开始执行 delete 操作...")
        DbTools.executeUpdate(s"delete from $tableName", conn(), autoClose = true)
      case other:String =>
        val msg:String = s"无效的Model:$other,CSVPipeline的[model]属性只支持[append,override]"
        throw new RuntimeException(msg)
    }
  }

  /**
    * 保存操作
    *
    * @param resultItems resultItems
    * @param task        task
    */
  override def save(resultItems: ResultItems, task: Task): Unit = {
    val paramMap:Map[String, Any] = resultItems.getAll.asScala.map(t=>t._1->t._2.asInstanceOf[Any]).toMap
    // 获取参数列表
    val params:List[Any] = fields.zipWithIndex
      .map(clumnAndIdx => (clumnAndIdx._2 + 1 ,paramMap.getOrElse(clumnAndIdx._1, null)))
      .sortBy(_._1).map(_._2)
    DbTools.executeUpdate(insertSql, conn(), true, params:_*)
  }

  override def close(): Unit = DbTools.close(dataSource)
}
